返回 引气・Java 气海初拓
13重走我的Java Day13:那个让系统瘫痪的并发bug:我是如何从一个死锁问题中学会多线程的
博主
大约 22 分钟
重走我的Java Day13:那个让系统瘫痪的并发bug:我是如何从一个死锁问题中学会多线程的
2018年,我在一个电商大促活动中,写了一段"完美"的库存扣减代码。上线5分钟后,系统完全卡死。当我把线程转储拿给资深架构师看时,他指着一个死锁图说:"你同时锁住了库存表和订单表,但顺序是反的。" 那一刻,我明白了多线程不是语法糖,而是分布式系统的心跳。
开篇:那个5分钟瘫痪系统的"完美代码"
事故现场

java
public class InventoryService {
private final Map<String, Integer> inventory = new ConcurrentHashMap<>();
private final Map<String, Object> locks = new ConcurrentHashMap<>();
public boolean deductInventory(String sku, int quantity) {
synchronized (locks.computeIfAbsent(sku, k -> new Object())) {
Integer current = inventory.get(sku);
if (current == null || current < quantity) {
return false;
}
inventory.put(sku, current - quantity);
return true;
}
}
}
public class OrderService {
private final Object orderLock = new Object();
private final InventoryService inventoryService;
public boolean createOrder(String userId, Map<String, Integer> items) {
synchronized (orderLock) {
// 第一步:预扣库存
for (Map.Entry<String, Integer> entry : items.entrySet()) {
if (!inventoryService.deductInventory(entry.getKey(), entry.getValue())) {
return false;
}
}
// 第二步:创建订单(假设这里需要一段时间)
try {
Thread.sleep(100); // 模拟数据库操作
createOrderInDatabase(userId, items);
} catch (InterruptedException e) {
// 恢复库存?但这里没有回滚逻辑!
Thread.currentThread().interrupt();
return false;
}
return true;
}
}
}
// 问题来了:如果两个线程同时创建订单,但包含相同的商品呢?
// 线程A:创建订单1,包含商品X和Y
// 线程B:创建订单2,包含商品Y和X
//
// 执行顺序:
// 线程A:锁住orderLock → 锁住商品X → 尝试锁商品Y(但被线程B锁着)
// 线程B:锁住orderLock → 锁住商品Y → 尝试锁商品X(但被线程A锁着)
//
// 经典死锁!而且没有任何超时机制,系统永远卡住。
损失
- 大促开始5分钟后,系统完全无响应
- 每分钟损失订单约1000笔
- 用户投诉电话被打爆
- 需要重启所有服务节点
一、线程基础:从"new Thread().start()"到"结构化并发"
1.1 线程创建的进化史

第一代:直接继承Thread(初学者之选)
java
// 问题:Java单继承限制,不利于扩展
class MyThread extends Thread {
@Override
public void run() {
System.out.println("我在线程里!");
}
}
// 使用
MyThread t = new MyThread();
t.start();
第二代:实现Runnable接口(工业级选择)
java
// 更好的分离关注点:任务与线程执行分离
class MyTask implements Runnable {
private final String taskName;
MyTask(String name) {
this.taskName = name;
}
@Override
public void run() {
System.out.println("执行任务: " + taskName);
}
}
// 使用:可以复用同一个任务对象
MyTask task = new MyTask("数据处理");
new Thread(task).start();
new Thread(task).start(); // 多个线程执行同一任务
第三代:实现Callable + Future(带返回值)
java
class DataProcessor implements Callable<Result> {
private final Data data;
DataProcessor(Data data) {
this.data = data;
}
@Override
public Result call() throws Exception {
// 处理数据,可能抛出异常
return processData(data);
}
}
// 使用:获取异步结果
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Result> future = executor.submit(new DataProcessor(data));
// 阻塞等待结果,最多等2秒
Result result = future.get(2, TimeUnit.SECONDS);
第四代:CompletableFuture(响应式编程)
java
// Java 8引入,支持函数式编程
public CompletableFuture<Order> createOrderAsync(OrderRequest request) {
return CompletableFuture.supplyAsync(() -> {
// 验证请求
validateRequest(request);
return request;
})
.thenApplyAsync(validatedRequest -> {
// 检查库存
return checkInventory(validatedRequest);
})
.thenApplyAsync(inventoryChecked -> {
// 扣减库存
return deductInventory(inventoryChecked);
})
.thenApplyAsync(inventoryDeducted -> {
// 创建订单
return saveOrder(inventoryDeducted);
})
.exceptionally(ex -> {
// 异常处理:恢复库存
recoverInventory(request);
throw new OrderException("创建订单失败", ex);
});
}
第五代:虚拟线程(Java 21+,革命性变化)
java
// 传统平台线程 vs 虚拟线程
public void compareThreads() {
// 平台线程:1:1映射到操作系统线程,创建成本高(约1MB栈内存)
Thread platformThread = Thread.ofPlatform()
.name("platform-", 0)
.start(() -> {
System.out.println("我是平台线程: " + Thread.currentThread());
});
// 虚拟线程:M:N映射到平台线程,创建成本极低(约几百字节)
Thread virtualThread = Thread.ofVirtual()
.name("virtual-", 0)
.start(() -> {
System.out.println("我是虚拟线程: " + Thread.currentThread());
});
// 批量创建虚拟线程(传统方式创建百万线程是不可能的)
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 1_000_000; i++) {
executor.submit(() -> {
Thread.sleep(Duration.ofSeconds(1));
return i;
});
}
}
}
1.2 线程状态:从JVM源码看真相
我通过阅读OpenJDK源码,理解了线程状态的本质:
java
// Thread.State枚举的实际含义
public enum State {
// NEW: 已创建但未启动(start()未调用)
NEW,
// RUNNABLE: 可运行状态(可能在运行,也可能在等待CPU)
// 注意:Java把就绪(ready)和运行(running)都归为RUNNABLE
RUNNABLE,
// BLOCKED: 等待监视器锁(synchronized)
// 只有在synchronized块外等待锁时才是BLOCKED
BLOCKED,
// WAITING: 无限期等待(wait()、join()、LockSupport.park())
WAITING,
// TIMED_WAITING: 限期等待(sleep()、wait(timeout)、join(timeout))
TIMED_WAITING,
// TERMINATED: 线程结束
TERMINATED
}
// 实战:监控线程状态
public class ThreadMonitor {
public static void analyzeThreadStates() {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
// 获取所有线程ID
long[] threadIds = threadBean.getAllThreadIds();
for (long threadId : threadIds) {
ThreadInfo info = threadBean.getThreadInfo(threadId);
if (info != null) {
System.out.printf("线程: %s (%d)%n",
info.getThreadName(), threadId);
System.out.printf(" 状态: %s%n",
info.getThreadState());
// 如果是BLOCKED状态,打印锁信息
if (info.getThreadState() == Thread.State.BLOCKED) {
System.out.printf(" 正在等待锁: %s%n",
info.getLockName());
System.out.printf(" 被线程 %d 持有%n",
info.getLockOwnerId());
}
// 打印堆栈跟踪
System.out.println(" 堆栈:");
for (StackTraceElement element : info.getStackTrace()) {
System.out.printf(" %s%n", element);
}
}
}
}
}
二、线程同步:从"synchronized everywhere"到"精细锁控制"

2.1 synchronized的误解与真相
误解1:synchronized方法锁的是对象
java
class Counter {
// 这个synchronized锁的是Counter实例(this)
public synchronized void increment() {
count++;
}
// 这个synchronized也锁Counter实例
public synchronized void decrement() {
count--;
}
// 问题:这两个方法互斥!一个线程increment时,另一个不能decrement
}
// 解决方案:使用不同的锁对象
class BetterCounter {
private final Object incLock = new Object();
private final Object decLock = new Object();
public void increment() {
synchronized (incLock) {
count++;
}
}
public void decrement() {
synchronized (decLock) {
count--;
}
}
}
误解2:synchronized静态方法锁的是类对象
java
class GlobalConfig {
private static Map<String, String> config = new HashMap<>();
// 这个锁的是GlobalConfig.class对象
public static synchronized void update(String key, String value) {
config.put(key, value);
}
// 问题:所有静态同步方法共用一把锁,可能成为瓶颈
}
// 真相:静态同步方法等效于
public static void update(String key, String value) {
synchronized (GlobalConfig.class) {
config.put(key, value);
}
}
2.2 Lock接口:比synchronized更强大,也更危险
ReentrantLock的基本使用
java
public class Account {
private final Lock lock = new ReentrantLock();
private BigDecimal balance;
public void transfer(Account to, BigDecimal amount) {
// 错误:可能死锁
this.lock.lock();
try {
to.lock.lock();
try {
this.balance = this.balance.subtract(amount);
to.balance = to.balance.add(amount);
} finally {
to.lock.unlock();
}
} finally {
this.lock.unlock();
}
}
}
死锁解决方案1:顺序加锁
java
public class Account {
private final long id; // 唯一ID用于排序
private final Lock lock = new ReentrantLock();
private BigDecimal balance;
public void transfer(Account to, BigDecimal amount) {
// 按照ID顺序获取锁,避免死锁
Account first = this.id < to.id ? this : to;
Account second = this.id < to.id ? to : this;
first.lock.lock();
try {
second.lock.lock();
try {
if (this.balance.compareTo(amount) < 0) {
throw new InsufficientBalanceException();
}
this.balance = this.balance.subtract(amount);
to.balance = to.balance.add(amount);
} finally {
second.lock.unlock();
}
} finally {
first.lock.unlock();
}
}
}
死锁解决方案2:尝试锁+超时
java
public class Account {
private final Lock lock = new ReentrantLock();
public boolean tryTransfer(Account to, BigDecimal amount, long timeout, TimeUnit unit) {
long stopTime = System.nanoTime() + unit.toNanos(timeout);
while (true) {
if (this.lock.tryLock()) {
try {
if (to.lock.tryLock()) {
try {
// 检查余额
if (this.balance.compareTo(amount) < 0) {
return false;
}
// 执行转账
this.balance = this.balance.subtract(amount);
to.balance = to.balance.add(amount);
return true;
} finally {
to.lock.unlock();
}
}
} finally {
this.lock.unlock();
}
}
// 检查是否超时
if (System.nanoTime() > stopTime) {
return false;
}
// 随机休眠避免活锁
try {
Thread.sleep(10 + (long)(Math.random() * 10));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
}
2.3 读写锁:读多写少的性能利器
java
public class PriceCache {
private final Map<String, BigDecimal> cache = new HashMap<>();
private final ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
private final Lock readLock = rwLock.readLock();
private final Lock writeLock = rwLock.writeLock();
// 读操作:多个线程可以同时读
public BigDecimal getPrice(String productId) {
readLock.lock();
try {
return cache.get(productId);
} finally {
readLock.unlock();
}
}
// 写操作:一次只有一个线程可以写
public void updatePrice(String productId, BigDecimal price) {
writeLock.lock();
try {
cache.put(productId, price);
} finally {
writeLock.unlock();
}
}
// 特殊场景:读锁升级为写锁(容易死锁,要小心)
public BigDecimal updateIfPresent(String productId, Function<BigDecimal, BigDecimal> updater) {
readLock.lock();
try {
BigDecimal current = cache.get(productId);
if (current == null) {
return null;
}
// 这里不能直接升级锁!需要释放读锁再获取写锁
readLock.unlock();
writeLock.lock();
try {
// 重新检查,因为可能被其他线程修改了
current = cache.get(productId);
if (current == null) {
return null;
}
BigDecimal updated = updater.apply(current);
cache.put(productId, updated);
return updated;
} finally {
// 注意:这里要重新获取读锁,保持锁的对称性
readLock.lock();
writeLock.unlock();
}
} finally {
readLock.unlock();
}
}
}
2.4 StampedLock:Java 8的性能怪兽
java
public class Point {
private double x, y;
private final StampedLock lock = new StampedLock();
// 乐观读:性能最好,但不保证一致性
public double distanceFromOrigin() {
// 1. 尝试乐观读
long stamp = lock.tryOptimisticRead();
double currentX = x, currentY = y;
// 2. 检查读期间是否有写操作
if (!lock.validate(stamp)) {
// 3. 如果有写操作,升级为悲观读锁
stamp = lock.readLock();
try {
currentX = x;
currentY = y;
} finally {
lock.unlockRead(stamp);
}
}
return Math.sqrt(currentX * currentX + currentY * currentY);
}
// 写操作
public void move(double deltaX, double deltaY) {
long stamp = lock.writeLock();
try {
x += deltaX;
y += deltaY;
} finally {
lock.unlockWrite(stamp);
}
}
// 读锁升级为写锁
public void moveIfAt(double oldX, double oldY, double newX, double newY) {
// 获取悲观读锁
long stamp = lock.readLock();
try {
while (x == oldX && y == oldY) {
// 尝试转换为写锁
long ws = lock.tryConvertToWriteLock(stamp);
if (ws != 0L) {
stamp = ws; // 转换成功,更新戳记
x = newX;
y = newY;
break;
} else {
// 转换失败,释放读锁,获取写锁
lock.unlockRead(stamp);
stamp = lock.writeLock();
}
}
} finally {
lock.unlock(stamp); // 释放当前持有的锁(可能是读锁或写锁)
}
}
}
三、线程间通信:从"wait/notify"到"消息队列"
3.1 传统的wait/notify模式
java
// 生产者-消费者模式的传统实现
public class BlockingQueue<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T item) throws InterruptedException {
while (queue.size() == capacity) {
wait(); // 队列满,等待
}
queue.add(item);
notifyAll(); // 通知消费者可能有数据了
}
public synchronized T take() throws InterruptedException {
while (queue.isEmpty()) {
wait(); // 队列空,等待
}
T item = queue.remove();
notifyAll(); // 通知生产者可能有空间了
return item;
}
}
// 问题:notifyAll()会唤醒所有等待线程,造成"惊群效应"
3.2 Condition接口:更精确的线程通信
java
public class ConditionBlockingQueue<T> {
private final Queue<T> queue = new LinkedList<>();
private final int capacity;
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition(); // 队列不满条件
private final Condition notEmpty = lock.newCondition(); // 队列不空条件
public void put(T item) throws InterruptedException {
lock.lock();
try {
while (queue.size() == capacity) {
notFull.await(); // 等待"不满"条件
}
queue.add(item);
notEmpty.signal(); // 只唤醒一个等待"不空"的消费者
} finally {
lock.unlock();
}
}
public T take() throws InterruptedException {
lock.lock();
try {
while (queue.isEmpty()) {
notEmpty.await(); // 等待"不空"条件
}
T item = queue.remove();
notFull.signal(); // 只唤醒一个等待"不满"的生产者
return item;
} finally {
lock.unlock();
}
}
}
3.3 使用BlockingQueue:Java并发库的礼物
java
// 大多数情况下,你应该直接使用Java内置的并发容器
public class ProducerConsumer {
private final BlockingQueue<Task> taskQueue = new LinkedBlockingQueue<>(100);
private final ExecutorService executor = Executors.newFixedThreadPool(10);
// 生产者
public void produce(Task task) throws InterruptedException {
// 如果队列满,会阻塞直到有空间
taskQueue.put(task);
}
// 消费者
public void startConsumers() {
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
// 如果队列空,会阻塞直到有任务
Task task = taskQueue.take();
processTask(task);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
}
}
}
四、线程池:从"new Thread()"到"生产级并发"
4.1 线程池的7个核心参数

java
public class ThreadPoolConfig {
// 正确的线程池配置
public ThreadPoolExecutor createOrderProcessingPool() {
int corePoolSize = Runtime.getRuntime().availableProcessors(); // CPU核心数
int maxPoolSize = corePoolSize * 4; // IO密集型,可以多配
return new ThreadPoolExecutor(
corePoolSize, // 核心线程数:即使空闲也保留
maxPoolSize, // 最大线程数
60L, // 空闲线程存活时间
TimeUnit.SECONDS, // 时间单位
new LinkedBlockingQueue<>(1000), // 工作队列:有界队列防止OOM
new NamedThreadFactory("order-processor"), // 线程工厂:命名线程
new OrderRejectionHandler() // 拒绝策略:自定义处理
);
}
// 命名的线程工厂(便于监控和调试)
static class NamedThreadFactory implements ThreadFactory {
private final String namePrefix;
private final AtomicInteger threadNumber = new AtomicInteger(1);
NamedThreadFactory(String namePrefix) {
this.namePrefix = namePrefix + "-";
}
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(false); // 非守护线程
t.setPriority(Thread.NORM_PRIORITY); // 正常优先级
return t;
}
}
// 自定义拒绝策略
static class OrderRejectionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (r instanceof OrderTask) {
OrderTask task = (OrderTask) r;
// 1. 记录日志
log.warn("订单任务被拒绝: {}", task.getOrderId());
// 2. 尝试重新加入队列(等待1秒)
try {
boolean offered = executor.getQueue().offer(r, 1, TimeUnit.SECONDS);
if (!offered) {
// 3. 如果还是加不进去,执行降级策略
executeFallback(task);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
executeFallback(task);
}
} else {
// 默认策略:抛出异常
throw new RejectedExecutionException("任务被拒绝");
}
}
private void executeFallback(OrderTask task) {
// 降级策略:比如同步执行、记录到数据库稍后重试等
log.error("订单任务执行降级: {}", task.getOrderId());
task.run(); // 直接在当前线程执行(注意:会阻塞当前线程!)
}
}
}
4.2 不同场景的线程池选择
java
public class ThreadPoolSelector {
// 场景1:快速响应的Web服务(Tomcat风格)
public ExecutorService createWebServerPool() {
return new ThreadPoolExecutor(
100, // 核心线程数较大,应对突发请求
200, // 最大线程数也较大
60L, // 空闲线程存活时间较短
TimeUnit.SECONDS,
new SynchronousQueue<>(), // 直接传递,不排队
new NamedThreadFactory("web-worker"),
new ThreadPoolExecutor.AbortPolicy() // 直接拒绝,快速失败
);
}
// 场景2:批量数据处理
public ExecutorService createBatchProcessingPool() {
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
processors, // 核心线程数 = CPU核心数
processors * 2, // 最大线程数 = 2倍核心数(考虑IO等待)
0L, // 核心线程也允许回收
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(), // 无界队列,积累任务
new NamedThreadFactory("batch-worker"),
new ThreadPoolExecutor.CallerRunsPolicy() // 调用者执行
);
}
// 场景3:定时任务调度
public ScheduledExecutorService createScheduledPool() {
return new ScheduledThreadPoolExecutor(
4, // 核心线程数
new NamedThreadFactory("scheduler"),
new ThreadPoolExecutor.AbortPolicy()
);
}
// 场景4:CPU密集型计算
public ExecutorService createComputePool() {
int processors = Runtime.getRuntime().availableProcessors();
return new ThreadPoolExecutor(
processors, // 核心线程数 = CPU核心数
processors, // 最大线程数 = CPU核心数(避免过多线程切换)
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(1000), // 有界队列,防止任务堆积
new NamedThreadFactory("compute-worker"),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}
4.3 线程池监控与调优
java
public class ThreadPoolMonitor {
private final ThreadPoolExecutor executor;
private final ScheduledExecutorService monitor;
public ThreadPoolMonitor(ThreadPoolExecutor executor) {
this.executor = executor;
this.monitor = Executors.newSingleThreadScheduledExecutor();
}
public void startMonitoring() {
// 每5秒收集一次指标
monitor.scheduleAtFixedRate(this::collectMetrics, 0, 5, TimeUnit.SECONDS);
}
private void collectMetrics() {
int poolSize = executor.getPoolSize();
int activeCount = executor.getActiveCount();
long completedTaskCount = executor.getCompletedTaskCount();
int queueSize = executor.getQueue().size();
int largestPoolSize = executor.getLargestPoolSize();
// 计算关键指标
double utilization = poolSize > 0 ? (double) activeCount / poolSize : 0;
double queueLoad = queueSize / 1000.0; // 假设队列容量1000
// 记录到监控系统
recordMetric("threadpool.pool_size", poolSize);
recordMetric("threadpool.active_threads", activeCount);
recordMetric("threadpool.utilization", utilization);
recordMetric("threadpool.queue_size", queueSize);
recordMetric("threadpool.queue_load", queueLoad);
recordMetric("threadpool.completed_tasks", completedTaskCount);
// 动态调整线程池(谨慎使用!)
if (queueLoad > 0.8 && utilization > 0.9) {
// 队列快满了,且线程很忙,考虑扩容
int newMax = Math.min(executor.getMaximumPoolSize(), poolSize * 2);
if (newMax > poolSize) {
executor.setMaximumPoolSize(newMax);
log.info("线程池扩容到: {}", newMax);
}
} else if (queueLoad < 0.2 && utilization < 0.3) {
// 队列很空,线程很闲,考虑缩容
int newCore = Math.max(1, poolSize / 2);
if (newCore < executor.getCorePoolSize()) {
executor.setCorePoolSize(newCore);
log.info("线程池缩容到: {}", newCore);
}
}
}
// 线程池健康检查
public HealthCheckResponse healthCheck() {
boolean isShutdown = executor.isShutdown();
boolean isTerminated = executor.isTerminated();
boolean isTerminating = executor.isTerminating();
int queueSize = executor.getQueue().size();
if (isShutdown || isTerminated || isTerminating) {
return HealthCheckResponse.unhealthy("线程池已关闭或正在关闭");
}
if (queueSize > 1000) { // 假设阈值1000
return HealthCheckResponse.unhealthy("任务队列堆积: " + queueSize);
}
// 检查是否有死锁(简化版)
long activeCount = executor.getActiveCount();
if (activeCount > 0) {
// 检查活跃线程是否长时间不变化(可能是死锁)
// 实际需要更复杂的检测逻辑
}
return HealthCheckResponse.healthy();
}
}
五、实战:重构库存扣减系统
基于开头的死锁问题,我重构了整个库存系统:

java
public class InventoryManager {
// 使用StripedLock:将大量锁分散到多个桶中,减少锁竞争
private final Striped<Lock> locks = Striped.lock(64); // 64个锁
// 使用ConcurrentHashMap + AtomicInteger保证原子性
private final ConcurrentHashMap<String, AtomicInteger> inventory = new ConcurrentHashMap<>();
// 使用限流器防止突发流量
private final RateLimiter rateLimiter = RateLimiter.create(1000); // 每秒1000次
// 使用线程池执行库存操作
private final ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
public CompletableFuture<Boolean> deductInventoryAsync(String sku, int quantity) {
return CompletableFuture.supplyAsync(() -> {
// 限流
if (!rateLimiter.tryAcquire(1, 10, TimeUnit.MILLISECONDS)) {
throw new RateLimitException("请求过于频繁");
}
// 获取分段锁
Lock lock = locks.get(sku);
if (!lock.tryLock(50, TimeUnit.MILLISECONDS)) {
throw new LockTimeoutException("获取库存锁超时");
}
try {
AtomicInteger stock = inventory.computeIfAbsent(sku, k -> new AtomicInteger(1000));
// 使用CAS操作更新库存
while (true) {
int current = stock.get();
if (current < quantity) {
return false; // 库存不足
}
if (stock.compareAndSet(current, current - quantity)) {
// 成功扣减
return true;
}
// CAS失败,重试
}
} finally {
lock.unlock();
}
}, executor);
}
// 批量扣减(避免死锁的关键)
public CompletableFuture<Boolean> batchDeduct(Map<String, Integer> items) {
return CompletableFuture.supplyAsync(() -> {
// 1. 按照商品ID排序,保证获取锁的顺序一致
List<String> sortedSkus = items.keySet().stream()
.sorted()
.collect(Collectors.toList());
// 2. 按顺序获取所有锁
List<Lock> acquiredLocks = new ArrayList<>();
try {
for (String sku : sortedSkus) {
Lock lock = locks.get(sku);
if (!lock.tryLock(100, TimeUnit.MILLISECONDS)) {
// 获取锁失败,释放所有已获得的锁
releaseLocks(acquiredLocks);
throw new LockTimeoutException("获取库存锁超时");
}
acquiredLocks.add(lock);
}
// 3. 检查所有商品的库存是否足够
for (String sku : sortedSkus) {
int required = items.get(sku);
int available = inventory.getOrDefault(sku, new AtomicInteger(0)).get();
if (available < required) {
return false; // 库存不足
}
}
// 4. 扣减所有商品的库存
for (String sku : sortedSkus) {
int required = items.get(sku);
AtomicInteger stock = inventory.computeIfAbsent(sku, k -> new AtomicInteger(1000));
stock.addAndGet(-required);
}
return true;
} finally {
// 5. 释放所有锁(按相反顺序释放,虽然不是必须的,但是个好习惯)
Collections.reverse(acquiredLocks);
releaseLocks(acquiredLocks);
}
}, executor);
}
private void releaseLocks(List<Lock> locksToRelease) {
for (Lock lock : locksToRelease) {
lock.unlock();
}
}
}
经验总结:我的多线程检查清单
安全性检查清单
- 是否避免了死锁?(锁顺序、超时机制)
- 是否处理了锁的可重入性?
- 是否在finally块中释放锁?
- 是否考虑了线程中断?
- 共享变量是否有正确的可见性保证?(volatile/原子类)
性能检查清单
- 锁的粒度是否足够细?(不要锁整个方法)
- 是否使用了读写锁优化读多写少场景?
- 线程池配置是否合理?(核心数、队列大小、拒绝策略)
- 是否有锁竞争监控?
- 是否考虑了CPU缓存行伪共享?
健壮性检查清单
- 是否有超时机制防止永久阻塞?
- 异常处理是否完善?(线程池任务异常、中断异常)
- 资源清理是否可靠?(线程池关闭、连接释放)
- 是否有降级策略?(拒绝任务时的处理)
可维护性检查清单
- 线程是否有有意义的名称?(便于调试)
- 是否有足够的日志记录关键操作?
- 是否有监控和告警机制?
- 代码是否易于测试?(可注入、可模拟)
最后的真相
那个死锁事故让我明白:
多线程编程不是关于让代码跑得更快,而是关于让系统在并发环境下正确工作。
我现在遵循的原则:
- 优先使用高级并发工具:BlockingQueue、ConcurrentHashMap、CompletableFuture
- 避免手动创建线程:使用线程池,不要new Thread()
- 锁的范围要最小化:只锁必要的部分,尽快释放
- 总是考虑超时:没有超时的等待是危险的
- 监控比优化更重要:先知道发生了什么,再考虑怎么优化
记住:在并发世界里,正确性永远比性能更重要。一个快的bug依然是bug,而一个正确但慢的系统至少可以工作。先让它正确,再让它变快。
知识点测试
读完文章了?来测试一下你对知识点的掌握程度吧!
评论区
使用 GitHub 账号登录后即可发表评论,支持 Markdown 格式。
如果评论系统无法加载,请确保:
- 您的网络可以访问 GitHub
- giscus GitHub App 已安装到仓库
- 仓库已启用 Discussions 功能